package com.lhl.mq.mqClient;

import com.lhl.mq.common.*;
import com.lhl.mq.mqServer.core.BasicProperties;
import com.lhl.mq.mqServer.core.ExchangeType;
import org.springframework.boot.autoconfigure.mail.MailProperties;

import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class Channel {
    private String channelId;
    //当前channel属于那个连接
    private Connection connection;
    //用来存储后续客户端收到的服务器响应
    //key-rid value-BasicReturns
    private ConcurrentHashMap<String, BasicReturns> basicReturn = new ConcurrentHashMap<>();
    //如果当前的channel订阅了某个队列，就需要在此处记录下对应的回调是什么，当该队列的消息返回回来的时候，调用回调
    //此处约定一个channel中只能有一个回调
    private Consumer consumer;

    public Channel(String channelId, Connection connection) {
        this.channelId = channelId;
        this.connection = connection;
    }

    //创建channel
    //此方法用于和服务器进行交互，告诉服务器，用户端创建了新的channel了
    public boolean createChannel() throws IOException {
        BasicArguments arguments = new BasicAckArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x1);
        request.setLength(payload.length);
        request.setPayload(payload);
        //发送请求
        connection.writeRequest(request);
        //等待服务器响应
        BasicReturns basicReturns = waitResult(arguments.getRid());

        return basicReturns.isOk();
    }

    //删除(关闭)channel
    public boolean deleteChannel() throws IOException {
        BasicArguments arguments = new BasicAckArguments();
        arguments.setChannelId(channelId);
        arguments.setRid(generateRid());
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x2);
        request.setLength(payload.length);
        request.setPayload(payload);

        //向服务器发送请求
        connection.writeRequest(request);
        //等待服务器响应
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

    //创建exchange
    public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,
                                Map<String,Object>arguments) throws IOException {
        ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();
        exchangeDeclareArguments.setRid(generateRid());
        exchangeDeclareArguments.setChannelId(channelId);
        exchangeDeclareArguments.setExchangeName(exchangeName);
        exchangeDeclareArguments.setExchangeType(exchangeType);
        exchangeDeclareArguments.setDurable(durable);
        exchangeDeclareArguments.setAutoDelete(autoDelete);
        exchangeDeclareArguments.setArguments(arguments);
        byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);

        Request request = new Request();
        request.setType(0x3);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());
        return basicReturns.isOk();
    }

    //删除exchange
    public boolean exchangeDelete(String exchangeName) throws IOException {
        ExchangeDeleteArguments arguments = new ExchangeDeleteArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setExchangeName(exchangeName);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x4);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

    //创建queue
    public boolean queueDeclare(String queueName,boolean durable,boolean exclusive,
                                boolean autoDelete,Map<String,Object>arguments) throws IOException {
        QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();
        queueDeclareArguments.setRid(generateRid());
        queueDeclareArguments.setChannelId(channelId);
        queueDeclareArguments.setQueueName(queueName);
        queueDeclareArguments.setDurable(durable);
        queueDeclareArguments.setExclusive(exclusive);
        queueDeclareArguments.setAutoDelete(autoDelete);
        queueDeclareArguments.setArguments(arguments);
        byte[] payload = BinaryTool.toBytes(queueDeclareArguments);

        Request request = new Request();
        request.setType(0x5);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());
        return basicReturns.isOk();
    }

    //删除queue
    public boolean queueDelete(String queueName) throws IOException {
        QueueDeleteArguments arguments = new QueueDeleteArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setQueueName(queueName);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x6);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

    //创建binding
    public boolean queueBind(String queueName,String exchangeName,String bindingKey) throws IOException {
        QueueBindArguments arguments = new QueueBindArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setQueueName(queueName);
        arguments.setExchangeName(exchangeName);
        arguments.setBidingKey(bindingKey);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x7);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

    //删除binding
    public boolean queueUnBind(String exchangeName,String queueName) throws IOException {
        QueueUnBindArguments arguments = new QueueUnBindArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setExchangeName(exchangeName);
        arguments.setQueueName(queueName);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x8);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

    //发送消息
    public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties,
                                byte[] body) throws IOException {
        BasicPublishArguments arguments = new BasicPublishArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setExchangeName(exchangeName);
        arguments.setBasicProperties(basicProperties);
        arguments.setBody(body);
        arguments.setRoutingKey(routingKey);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x9);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

    //订阅消息·
    public boolean basicConsume(String queueName,boolean autoAck,Consumer consumer) throws IOException, MqException {
        if (this.consumer != null){
            throw new MqException("[Channel] 该channel已经被设置过消费消息的回调了，不能重复设置！");
        }
        this.consumer = consumer;

        BasicConsumeArguments arguments = new BasicConsumeArguments();
        arguments.setRid(generateRid());
        arguments.setConsumerTag(channelId);
        arguments.setQueueName(queueName);
        arguments.setAutoAck(autoAck);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0xa);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

    //返回ack
    public boolean basicAck(String queueName,String messageId) throws IOException {
        BasicAckArguments arguments = new BasicAckArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setMessageId(messageId);
        arguments.setQueueName(queueName);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0xb);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }


    //使用这个方法不停的去循环访问收取响应的哈希表，看响应回来了没有
    public BasicReturns waitResult(String rid){
        BasicReturns basicReturns = null;
        while ((basicReturns = basicReturn.get(rid)) == null){
            synchronized (this) {
                try {
                    wait();
                } catch (InterruptedException e) {
                   e.printStackTrace();
                }
            }
        }
        //读取成功后要将消息从哈希表中删除
        basicReturn.remove(rid);
        return basicReturns;
    }


    public void putReturn(BasicReturns basicReturns) {
        basicReturn.put(basicReturns.getRid(),basicReturns);
        synchronized (this){
            //唤醒所有线程
            notifyAll();
        }
    }

    public String generateRid(){
        String rid = "R-" + UUID.randomUUID();
        return rid;
    }


    public String getChannelId() {
        return channelId;
    }

    public void setChannelId(String channelId) {
        this.channelId = channelId;
    }

    public Connection getConnection() {
        return connection;
    }

    public void setConnection(Connection connection) {
        this.connection = connection;
    }

    public ConcurrentHashMap<String, BasicReturns> getBasicReturn() {
        return basicReturn;
    }

    public void setBasicReturn(ConcurrentHashMap<String, BasicReturns> basicReturn) {
        this.basicReturn = basicReturn;
    }

    public Consumer getConsumer() {
        return consumer;
    }

    public void setConsumer(Consumer consumer) {
        this.consumer = consumer;
    }


}
